-
Notifications
You must be signed in to change notification settings - Fork 20
fix: Apply timeout correctly to consumeNum #330
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull Request Overview
This PR updates consumeNum
to apply the overall batch timeout rather than resetting per message, ensuring the loop only uses the remaining batch time for each Consume
call.
- Introduce
steady_clock
start timestamp and calculate remaining timeout each iteration - Guard minimum timeout to allow non-blocking/fast-exit scenarios
- Add
<chrono>
include for timing functions
Comments suppressed due to low confidence (2)
src/workers.cc:821
- [nitpick] Clarify what ‘early exits’ means and why the threshold is 1 ms. For example:
// Skip recalculation for timeout ≤ 1 ms to allow immediate non-blocking or minimal-delay consumes
.
// Allow timeout_ms = 1 early exits to work
src/workers.cc:824
- [nitpick] Rename variable
now
tocurrent_time
ornow_timepoint
to make its role more explicit.
auto now = std::chrono::steady_clock::now();
I've not reviewed the PR completely yet, but this bug has been there long enough that I assume someone might have started accidentally relying on this behaviour. Could you make this change conditional on calling a setter before consuming? Similar to how "setDefaultIsTimeoutOnlyForFirstMessage" works currently in the code. We can keep it 'false' for now and make it 'true' when we do a major version release. Besides that, you could add a failing test case in the automated test that passes after this change. too? Let me know if you have any issues adding that. Thanks a lot for this PR! |
fix timeout
Sure sure, that makes sense lol Updated so the new behavior is opt in. Also added two e2e tests that spec both behaviors as if they're desired. Let me know what you think! 🙏 |
What
Applies timeout to the entire batch instead of on a per message basis.
How
The calls to the individual consume share the batch timeout and each call to consume can only use the timeout that the batch has remaining.
Why
The existing behavior is described in detail in this issue.
In short, for
d
millisecond delay,c
count, andb
blocking time:b = d * c
, given a constant topic rpm60000 / d
.Given any rpm
> 60000 / d
:b = c * (60 / rpm)
secondsFor example, a 1000ms delay with a batch count of 100 will cause the
consumeNum
loop to block for up to 100 seconds given a constant topic rpm of 60.References
Issue: #262
PR Introduced: Blizzard/node-rdkafka#34
PRs in node-rdkafka addressing the same issue:
Blizzard/node-rdkafka#1061
Blizzard/node-rdkafka#1053
Test & Review
With c = 100, d = 1000, rpm = 700, expectation is that pre-change we'd block for about 8.6 seconds (it seems like it takes longer in the example, though) before returning the 100 messages.
My testing environment has a 50ms cooldown on calling
consume
, so after the change we expect a batch of messages to be returned every 1050ms.Pre-Change

Post-Change
